<?php
namespace app\api\services;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\facade\Db;

class RabbitMqService
{
    static public $channel;
    static public $mqConfig;
    static public $connection;

    public static function send($event, $data)
    {
        //链接MQ服务器
        self::$mqConfig   = config('rabbitmq.hosts');
        self::$connection = new AMQPStreamConnection(
            self::$mqConfig['host'],
            self::$mqConfig['port'],
            self::$mqConfig['username'],
            self::$mqConfig['password']
        );

        $exchange      = $event["exchange"];
        self::$channel = self::$connection->channel();
        /**
         * 创建队列(Queue)
         * name: hello         // 队列名称
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: true       // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失;设置true,则代表是一个持久化的队列，服务重启后也会存在，因为服务会把持久化的queue存放到磁盘上当服务重启的时候，会重新加载之前被持久化的queue
         * exclusive: false    // 是否排他，指定该选项为true则队列只对当前连接有效，连接断开后自动删除
         * auto_delete: false // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         * auto_delete: false // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         * nowait            //是否异步执行
         * $arguments    array()    设定消息队列的额外参数，如存活时间等  参考下例注释方法传对象，数组未知
         * $ticket    null    传0或null即可
         */
        // 和普通生产者区别 在这 下面是过期时间和转发到的路由
        /*$arguments=new AMQPTable();
        $arguments=$arguments->set(array(
            'x-dead-letter-exchange' =>  $exchange['exchange_name'],//交换机名称
            'x-dead-letter-routing-key' =>$exchange['route_key'],//路由名称
            'x-message-ttl' => 2000,
        ));
        $arguments=[];*/
        self::$channel->queue_declare($exchange['queue_name'], false, true, false, false, false, []);
        /**
         * 创建交换机(Exchange)
         * name: vckai_exchange// 交换机名称
         * type: direct        // 交换机类型，分别为direct/fanout/topic，参考另外文章的Exchange Type说明。
         * passive: false      // 如果设置true存在则返回OK，否则就报错。设置false存在返回OK，不存在则自动创建
         * durable: false      // 是否持久化，设置false是存放到内存中的，RabbitMQ重启后会丢失
         * auto_delete: false  // 是否自动删除，当最后一个消费者断开连接之后队列是否自动被删除
         */
        self::$channel->exchange_declare($exchange['exchange_name'], 'direct', false, true, false);
        // 绑定消息交换机和队列
        self::$channel->queue_bind($exchange['queue_name'], $exchange['exchange_name'], $exchange['route_key']);

        $messageBody = json_encode($data, JSON_UNESCAPED_UNICODE); //将要发送数据变为json字符串
        Db::name('test')->insert(['param' => "发送列队实体内容：" . $messageBody . "\n 路由信息：" . json_encode($exchange, JSON_UNESCAPED_UNICODE)]);
        /**
         * 创建AMQP消息类型
         * delivery_mode 消息是否持久化
         * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化
         * AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化
         */
        $message = new AMQPMessage($messageBody, ['content_type' => 'application/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

        /**
         * 发送消息
         * msg: $message            // AMQP消息内容
         * exchange: vckai_exchange // 交换机名称
         * routing_key: hello       // 路由key
         */
        self::$channel->basic_publish($message, $exchange['exchange_name'], $exchange['route_key']);
        //关闭进程
        self::stop();
        //var_dump($exchange);
    }

    public static function sendDelay($data, $seconds)
    {
        //链接MQ服务器
        self::$mqConfig   = config('rabbitmq.hosts');
        self::$connection = new AMQPStreamConnection(
            self::$mqConfig['host'],
            self::$mqConfig['port'],
            self::$mqConfig['username'],
            self::$mqConfig['password']
        );

        self::$channel = self::$connection->channel();

        $delay = config('rabbitmq.delay_order_queue');

        $ttl = 1000 * $seconds;

        //指定交换机类型为direct
        self::$channel->exchange_declare($delay['exchange_name'],
            'x-delayed-message',
            true,
            false,
            false,
        );
        $args = new AMQPTable(['x-delayed-type' => 'direct']);

        self::$channel->queue_declare($delay['queue_name'],
            false,
            true,
            false,
            false,
            false,
            $args,
        );

        self::$channel->queue_bind($delay['queue_name'], $delay['exchange_name'], $delay['route_key']);

        $arr = [
            'delivery_mode'       => AMQPMEssage::DELIVERY_MODE_PERSISTENT,
            'application_headers' => new AMQPTable(['x-delay' => $ttl]),
        ];

        $messageBody = json_encode($data, JSON_UNESCAPED_UNICODE);

        $message = new AMQPMessage($messageBody, $arr);

        // 发送消息到延迟交换器
        self::$channel->basic_publish($message, $delay['exchange_name'], $delay['route_key']);

        self::stop();
    }

    //关闭进程
    public static function stop()
    {
        self::$channel->close();
        self::$connection->close();
    }

}
